# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE

import functools
import io
import os
import shutil
import sys
import traceback
from collections import deque
from collections.abc import (
	Callable,
	Container,
	Generator,
	Iterable,
	MutableMapping,
	MutableSequence,
	Sequence,
)
from email.header import decode_header, make_header
from email.utils import formataddr, parseaddr
from typing import Any, Generic, TypeAlias, TypedDict

from werkzeug.test import Client

from frappe.deprecation_dumpster import gzip_compress, gzip_decompress, make_esc

# utility functions like cint, int, flt, etc.
from frappe.utils.data import *
from frappe.utils.html_utils import sanitize_html

EMAIL_NAME_PATTERN = re.compile(r"[^A-Za-z0-9\u00C0-\u024F\/\_\' ]+")
EMAIL_STRING_PATTERN = re.compile(r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)")
NON_MD_HTML_PATTERN = re.compile(r"<p[\s]*>|<br[\s]*>")
HTML_TAGS_PATTERN = re.compile(r"\<[^>]*\>")
INCLUDE_DIRECTIVE_PATTERN = re.compile("""({% include ['"]([^'"]*)['"] %})""")
PHONE_NUMBER_PATTERN = re.compile(r"([0-9\ \+\_\-\,\.\*\#\(\)]){1,20}$")
PERSON_NAME_PATTERN = re.compile(r"^[\w][\w\'\-]*( \w[\w\'\-]*)*$")
WHITESPACE_PATTERN = re.compile(r"[\t\n\r]")
MULTI_EMAIL_STRING_PATTERN = re.compile(r'[,\n](?=(?:[^"]|"[^"]*")*$)')
EMAIL_MATCH_PATTERN = re.compile(
	r"[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?",
	re.IGNORECASE,
)

UNSET = object()

PropertyType: TypeAlias = property | functools.cached_property


if sys.version_info < (3, 11):

	def exception():
		_exc_type, exc_value, _exc_traceback = sys.exc_info()
		return exc_value

	sys.exception = exception


def get_fullname(user=None):
	"""get the full name (first name + last name) of the user from User"""
	if not user:
		user = frappe.session.user

	if not hasattr(frappe.local, "fullnames"):
		frappe.local.fullnames = {}

	if not frappe.local.fullnames.get(user):
		p = frappe.db.get_value("User", user, ["first_name", "last_name"], as_dict=True)
		if p:
			frappe.local.fullnames[user] = (
				" ".join(filter(None, [p.get("first_name"), p.get("last_name")])) or user
			)
		else:
			frappe.local.fullnames[user] = user

	return frappe.local.fullnames.get(user)


def get_email_address(user=None):
	"""get the email address of the user from User"""
	if not user:
		user = frappe.session.user

	return frappe.db.get_value("User", user, "email")


def get_formatted_email(user, mail=None):
	"""get Email Address of user formatted as: `John Doe <johndoe@example.com>`"""
	fullname = get_fullname(user)
	method = get_hook_method("get_sender_details")

	if method:
		sender_name, mail = method()
		# if method exists but sender_name is ""
		fullname = sender_name or fullname

	if not mail:
		mail = get_email_address(user) or validate_email_address(user)

	if not mail:
		return ""
	else:
		return cstr(make_header(decode_header(formataddr((fullname, mail)))))


def extract_email_id(email):
	"""fetch only the email part of the Email Address"""
	return cstr(parse_addr(email)[1])


def validate_phone_number_with_country_code(phone_number: str, fieldname: str) -> None:
	from phonenumbers import NumberParseException, is_valid_number, parse

	from frappe import _

	if not phone_number:
		return

	valid_number = False
	error_message = _("Phone Number {0} set in field {1} is not valid.")
	error_title = _("Invalid Phone Number")
	try:
		if valid_number := is_valid_number(parse(phone_number)):
			return True
	except NumberParseException as e:
		if e.error_type == NumberParseException.INVALID_COUNTRY_CODE:
			error_message = _("Please select a country code for field {1}.")
			error_title = _("Country Code Required")
	finally:
		if not valid_number:
			frappe.throw(
				error_message.format(frappe.bold(phone_number), frappe.bold(fieldname)),
				title=error_title,
				exc=frappe.InvalidPhoneNumberError,
			)


def validate_phone_number(phone_number, throw=False):
	"""Return True if valid phone number."""
	if not phone_number:
		return False

	phone_number = phone_number.strip()
	match = PHONE_NUMBER_PATTERN.match(phone_number)

	if not match and throw:
		frappe.throw(
			frappe._("{0} is not a valid Phone Number").format(phone_number), frappe.InvalidPhoneNumberError
		)

	return bool(match)


def validate_name(name, throw=False):
	"""Return True if the name is valid

	* valid names may have unicode and ascii characters, dash, quotes, numbers
	* anything else is considered invalid

	Note: "Name" here is name of a person, not the primary key in Frappe doctypes.
	"""
	if not name:
		return False

	name = name.strip()
	match = PERSON_NAME_PATTERN.match(name)

	if not match and throw:
		frappe.throw(frappe._("{0} is not a valid Name").format(name), frappe.InvalidNameError)

	return bool(match)


def validate_email_address(email_str, throw=False):
	"""Validates the email string"""
	email = email_str = (email_str or "").strip()

	def _check(e):
		_valid = True
		if not e:
			_valid = False

		if "undisclosed-recipient" in e:
			return False

		elif " " in e and "<" not in e:
			# example: "test@example.com test2@example.com" will return "test@example.comtest2" after parseaddr!!!
			_valid = False

		else:
			email_id = extract_email_id(e)
			match = EMAIL_MATCH_PATTERN.match(email_id) if email_id else None

			if not match:
				_valid = False

		if not _valid:
			if throw:
				invalid_email = frappe.utils.escape_html(e)
				frappe.throw(
					frappe._("{0} is not a valid Email Address").format(invalid_email),
					frappe.InvalidEmailAddressError,
				)
			return None
		else:
			return email_id

	out = []
	for e in email_str.split(","):
		if not e:
			continue
		email = _check(e.strip())
		if email:
			out.append(email)

	return ", ".join(out)


def split_emails(txt):
	email_list = []

	# emails can be separated by comma or newline
	s = WHITESPACE_PATTERN.sub(" ", cstr(txt))
	for email in MULTI_EMAIL_STRING_PATTERN.split(s):
		email = strip(cstr(email))
		if email:
			email_list.append(email)

	return email_list


def validate_url(
	txt: str,
	throw: bool = False,
	valid_schemes: str | Container[str] | None = None,
) -> bool:
	"""
	Return True if `txt` represents a valid URL.

	Args:
	        throw: throws a validationError if URL is not valid
	        valid_schemes: if provided checks the given URL's scheme against this
	"""
	url = urlparse(txt)
	is_valid = bool(url.scheme and (url.netloc or url.path)) or bool(txt and txt.startswith("/"))

	# Handle scheme validation
	if isinstance(valid_schemes, str):
		is_valid = is_valid and (url.scheme == valid_schemes)
	elif isinstance(valid_schemes, list | tuple | set):
		is_valid = is_valid and (url.scheme in valid_schemes)

	if not is_valid and throw:
		frappe.throw(frappe._("'{0}' is not a valid URL").format(frappe.bold(txt)))

	return is_valid


def random_string(length: int) -> str:
	"""generate a random string"""
	import string
	from random import choice

	return "".join(choice(string.ascii_letters + string.digits) for i in range(length))


def has_gravatar(email: str) -> str:
	"""Return gravatar url if user has set an avatar at gravatar.com."""
	import requests

	if frappe.flags.in_import or frappe.flags.in_install or frappe.in_test:
		# no gravatar if via upload
		# since querying gravatar for every item will be slow
		return ""

	gravatar_url = get_gravatar_url(email, "404")
	try:
		res = requests.get(gravatar_url, timeout=5)
		if res.status_code == 200:
			return gravatar_url
		else:
			return ""
	except requests.exceptions.RequestException:
		return ""


def get_gravatar_url(email: str, default: Literal["mm", "404"] = "mm") -> str:
	"""Return gravatar URL for the given email.

	If `default` is set to "404", gravatar URL will return 404 if no avatar is found.
	If `default` is set to "mm", a placeholder image will be returned.
	"""
	hexdigest = hashlib.md5(frappe.as_unicode(email).encode("utf-8"), usedforsecurity=False).hexdigest()
	return f"https://secure.gravatar.com/avatar/{hexdigest}?d={default}&s=200"


def get_gravatar(email: str) -> str:
	"""Return gravatar URL if user has set an avatar at gravatar.com.

	Else return identicon image (base64)."""
	from frappe.utils.identicon import Identicon

	return has_gravatar(email) or Identicon(email).base64()


def get_traceback(with_context: bool = False) -> str:
	"""Return the traceback of the Exception."""
	from traceback_with_variables import iter_exc_lines

	exc_type, exc_value, exc_tb = sys.exc_info()

	if not any([exc_type, exc_value, exc_tb]):
		return ""

	if with_context:
		trace_list = iter_exc_lines(fmt=_get_traceback_sanitizer())
		tb = "\n".join(trace_list)
	else:
		trace_list = traceback.format_exception(exc_type, exc_value, exc_tb)
		tb = "".join(cstr(t) for t in trace_list)

	bench_path = get_bench_path() + "/"
	return tb.replace(bench_path, "")


@functools.lru_cache(maxsize=1)
def _get_traceback_sanitizer():
	from traceback_with_variables import Format

	blocklist = [
		"password",
		"passwd",
		"secret",
		"token",
		"key",
		"pwd",
	]

	placeholder = "********"

	def dict_printer(v: dict) -> str:
		from copy import deepcopy

		v = deepcopy(v)
		for key in blocklist:
			if key in v:
				v[key] = placeholder

		return str(v)

	# Adapted from https://github.com/andy-landy/traceback_with_variables/blob/master/examples/format_customized.py
	# Reused under MIT license: https://github.com/andy-landy/traceback_with_variables/blob/master/LICENSE

	return Format(
		custom_var_printers=[
			# redact variables
			*[(variable_name, lambda *a, **kw: placeholder) for variable_name in blocklist],
			# redact dictionary keys
			(["_secret", dict, lambda *a, **kw: False], dict_printer),
			(["_secret", frappe._dict, lambda *a, **kw: False], dict_printer),
		],
	)


def log(event, details):
	frappe.logger(event).info(details)


def dict_to_str(args: dict[str, Any], sep: str = "&") -> str:
	"""Convert a dictionary to URL."""
	return sep.join(f"{k!s}=" + quote(str(args[k] or "")) for k in list(args))


def list_to_str(seq, sep=", "):
	"""Convert a sequence into a string using seperator.

	Same as str.join, but does type conversion and strip extra spaces.
	"""
	return sep.join(map(str.strip, map(str, seq)))


# Get Defaults
# ==============================================================================


def get_defaults(key=None):
	"""
	Get dictionary of default values from the defaults, or a value if key is passed
	"""
	return frappe.db.get_defaults(key)


def set_default(key, val):
	"""
	Set / add a default value to defaults`
	"""
	return frappe.db.set_default(key, val)


def remove_blanks(d: dict) -> dict:
	"""Return d with empty ('' or None) values stripped. Mutates inplace."""
	for k, v in tuple(d.items()):
		if not v:
			del d[k]
	return d


def strip_html_tags(text: str) -> str:
	"""Remove html tags from the given `text`."""
	return HTML_TAGS_PATTERN.sub("", text)


def get_file_timestamp(fn):
	"""Return timestamp of the given file."""
	from frappe.utils import cint

	try:
		return str(cint(os.stat(fn).st_mtime))
	except OSError as e:
		if e.args[0] != 2:
			raise
		else:
			return None


# esc / unescape characters -- used for command line
def esc(s, esc_chars):
	"""
	Escape special characters
	"""
	if not s:
		return ""
	for c in esc_chars:
		esc_str = "\\" + c
		s = s.replace(c, esc_str)
	return s


def unesc(s, esc_chars):
	"""
	UnEscape special characters
	"""
	for c in esc_chars:
		esc_str = "\\" + c
		s = s.replace(esc_str, c)
	return s


def execute_in_shell(cmd, verbose=False, low_priority=False, check_exit_code=False):
	# using Popen instead of os.system - as recommended by python docs
	import shlex
	import tempfile
	from subprocess import Popen

	if isinstance(cmd, list):
		# ensure it's properly escaped; only a single string argument executes via shell
		cmd = shlex.join(cmd)

	with tempfile.TemporaryFile() as stdout, tempfile.TemporaryFile() as stderr:
		kwargs = {
			"shell": True,
			"stdout": stdout,
			"stderr": stderr,
			"executable": shutil.which("bash") or "/bin/bash",
		}

		if low_priority:
			kwargs["preexec_fn"] = lambda: os.nice(10)

		p = Popen(cmd, **kwargs)
		exit_code = p.wait()

		stdout.seek(0)
		out = stdout.read()

		stderr.seek(0)
		err = stderr.read()

	failed = check_exit_code and exit_code

	if verbose or failed:
		if err:
			print(err)
		if out:
			print(out)

	if failed:
		raise frappe.CommandFailedError(
			"Command failed", out.decode(errors="replace"), err.decode(errors="replace")
		)

	return err, out


def get_path(*path, **kwargs):
	base = kwargs.get("base")
	if not base:
		base = frappe.local.site_path
	return os.path.join(base, *path)


def get_site_base_path():
	return frappe.local.site_path


def get_site_path(*path):
	return get_path(*path, base=get_site_base_path())


def get_files_path(*path, **kwargs):
	return get_site_path("private" if kwargs.get("is_private") else "public", "files", *path)


def get_bench_path():
	return os.environ.get("FRAPPE_BENCH_ROOT") or os.path.realpath(
		os.path.join(os.path.dirname(frappe.__file__), "..", "..", "..")
	)


def get_bench_id():
	return frappe.get_conf().get("bench_id", get_bench_path().strip("/").replace("/", "-"))


def get_site_id(site=None):
	return f"{site or frappe.local.site}@{get_bench_id()}"


def get_backups_path():
	return get_site_path("private", "backups")


def get_request_site_address(full_address=False):
	return get_url(full_address=full_address)


def get_site_url(site):
	conf = frappe.get_conf(site)
	if conf.host_name:
		return conf.host_name
	return f"http://{site}:{conf.webserver_port}"


def encode_dict(d, encoding="utf-8"):
	for key in d:
		if isinstance(d[key], str) and isinstance(d[key], str):
			d[key] = d[key].encode(encoding)

	return d


def decode_dict(d, encoding="utf-8"):
	for key in d:
		if isinstance(d[key], str) and not isinstance(d[key], str):
			d[key] = d[key].decode(encoding, "ignore")
	return d


@functools.lru_cache
def get_site_name(hostname):
	return hostname.split(":", 1)[0]


def get_disk_usage():
	"""get disk usage of files folder"""
	files_path = get_files_path()
	if not os.path.exists(files_path):
		return 0
	err, out = execute_in_shell(f"du -hsm {files_path}")
	return cint(out.split("\n")[-2].split("\t")[0])


def touch_file(path):
	with open(path, "a"):
		os.utime(path, None)
	return path


def get_test_client(use_cookies=True) -> Client:
	"""Return an test instance of the Frappe WSGI."""
	from frappe.app import application

	return Client(application, use_cookies=use_cookies)


def get_hook_method(hook_name, fallback=None):
	method = frappe.get_hooks().get(hook_name)
	if method:
		method = frappe.get_attr(method[0])
		return method
	if fallback:
		return fallback


def call_hook_method(hook, *args, **kwargs):
	out = None
	for method_name in frappe.get_hooks(hook):
		out = out or frappe.get_attr(method_name)(*args, **kwargs)

	return out


def is_cli() -> bool:
	"""Return True if current instance is being run via a terminal."""
	invoked_from_terminal = False
	try:
		invoked_from_terminal = bool(os.get_terminal_size())
	except Exception:
		invoked_from_terminal = sys.stdin and sys.stdin.isatty()
	return invoked_from_terminal


def update_progress_bar(txt, i, l, absolute=False):
	if os.environ.get("CI"):
		if i == 0:
			sys.stdout.write(txt)

		sys.stdout.write(".")
		sys.stdout.flush()
		return

	if not getattr(frappe.local, "request", None) or is_cli():  # pragma: no cover
		lt = len(txt)
		try:
			col = 40 if os.get_terminal_size().columns > 80 else 20
		except OSError:
			# in case function isn't being called from a terminal
			col = 40

		if lt < 36:
			txt = txt + " " * (36 - lt)

		complete = int(float(i + 1) / l * col)
		completion_bar = ("=" * complete).ljust(col, " ")
		percent_complete = f"{int(float(i + 1) / l * 100)!s}%"
		status = f"{i} of {l}" if absolute else percent_complete
		sys.stdout.write(f"\r{txt}: [{completion_bar}] {status}")
		sys.stdout.flush()


def get_html_format(print_path):
	html_format = None
	if os.path.exists(print_path):
		with open(print_path) as f:
			html_format = f.read()

		for include_directive, path in INCLUDE_DIRECTIVE_PATTERN.findall(html_format):
			for app_name in frappe.get_installed_apps():
				include_path = frappe.get_app_path(app_name, *path.split(os.path.sep))
				if os.path.exists(include_path):
					with open(include_path) as f:
						html_format = html_format.replace(include_directive, f.read())
					break

	return html_format


def is_markdown(text):
	if "<!-- markdown -->" in text:
		return True
	elif "<!-- html -->" in text:
		return False
	else:
		return not NON_MD_HTML_PATTERN.search(text)


def is_a_property(x) -> bool:
	"""Get properties (@property, @cached_property) in a controller class"""
	return isinstance(x, PropertyType)


def get_sites(sites_path=None):
	if not sites_path:
		sites_path = getattr(frappe.local, "sites_path", None) or "."

	sites = []
	for site in os.listdir(sites_path):
		path = os.path.join(sites_path, site)

		if (
			os.path.isdir(path)
			and not os.path.islink(path)
			and os.path.exists(os.path.join(path, "site_config.json"))
		):
			# is a dir and has site_config.json
			sites.append(site)

	return sorted(sites)


def get_request_session(max_retries=5):
	import requests
	from requests.adapters import HTTPAdapter, Retry

	session = requests.Session()
	http_adapter = HTTPAdapter(max_retries=Retry(total=max_retries, status_forcelist=[500]))

	session.mount("http://", http_adapter)
	session.mount("https://", http_adapter)

	return session


def markdown(text, sanitize=True, linkify=True):
	html = text if is_html(text) else frappe.utils.md_to_html(text)

	if sanitize:
		html = html.replace("<!-- markdown -->", "")
		html = sanitize_html(html, linkify=linkify)

	return html


def sanitize_email(emails):
	sanitized = []
	for e in split_emails(emails):
		if not validate_email_address(e):
			continue

		full_name, email_id = parse_addr(e)
		sanitized.append(formataddr((full_name, email_id)))

	return ", ".join(sanitized)


def parse_addr(email_string):
	"""
	Return email_id and user_name based on email string
	Raise error if email string is not valid
	"""
	name, email = parseaddr(email_string)
	if check_format(email):
		name = get_name_from_email_string(email_string, email, name)
		return (name, email)
	else:
		email_list = EMAIL_STRING_PATTERN.findall(email_string)
		if len(email_list) > 0 and check_format(email_list[0]):
			# take only first email address
			email = email_list[0]
			name = get_name_from_email_string(email_string, email, name)
			return (name, email)
	return (None, email)


def check_format(email_id):
	"""
	Check if email_id is valid. valid email:text@example.com
	String check ensures that email_id contains both '.' and
	'@' and index of '@' is less than '.'
	"""
	is_valid = False
	try:
		pos = email_id.rindex("@")
		is_valid = pos > 0 and (email_id.rindex(".") > pos) and (len(email_id) - pos > 4)
	except Exception:
		# print(e)
		pass
	return is_valid


def get_name_from_email_string(email_string, email_id, name):
	name = email_string.replace(email_id, "")
	name = EMAIL_NAME_PATTERN.sub("", name).strip()
	if not name:
		name = email_id
	return name


def get_installed_apps_info():
	out = []
	from frappe.utils.change_log import get_versions

	out.extend(
		{
			"app_name": app,
			"version": version_details.get("branch_version") or version_details.get("version"),
			"branch": version_details.get("branch"),
		}
		for app, version_details in get_versions().items()
	)
	return out


def get_site_info():
	from frappe.email.queue import get_emails_sent_this_month
	from frappe.utils.user import get_system_managers

	# only get system users
	users = frappe.get_all(
		"User",
		filters={"user_type": "System User", "name": ("not in", frappe.STANDARD_USERS)},
		fields=["name", "enabled", "last_login", "last_active", "language", "time_zone"],
	)
	system_managers = get_system_managers(only_name=True)
	for u in users:
		# tag system managers
		u.is_system_manager = 1 if u.name in system_managers else 0
		u.full_name = get_fullname(u.name)
		u.email = u.name
		del u["name"]

	system_settings = frappe.db.get_singles_dict("System Settings")
	space_usage = frappe._dict((frappe.local.conf.limits or {}).get("space_usage", {}))

	kwargs = {
		"fields": ["user", "creation", "full_name"],
		"filters": {"operation": "Login", "status": "Success"},
		"limit": "10",
	}

	site_info = {
		"installed_apps": get_installed_apps_info(),
		"users": users,
		"country": system_settings.country,
		"language": system_settings.language or "english",
		"time_zone": system_settings.time_zone,
		"setup_complete": frappe.is_setup_complete(),
		"scheduler_enabled": system_settings.enable_scheduler,
		# usage
		"emails_sent": get_emails_sent_this_month(),
		"space_used": flt((space_usage.total or 0) / 1024.0, 2),
		"database_size": space_usage.database_size,
		"backup_size": space_usage.backup_size,
		"files_size": space_usage.files_size,
		"last_logins": frappe.get_all("Activity Log", **kwargs),
	}

	# from other apps
	for method_name in frappe.get_hooks("get_site_info"):
		site_info.update(frappe.get_attr(method_name)(site_info) or {})

	# dumps -> loads to prevent datatype conflicts
	return json.loads(frappe.as_json(site_info))


def parse_json(val: str):
	"""
	Parses json if string else return
	"""
	if isinstance(val, str):
		val = json.loads(val)
	if isinstance(val, dict):
		val = frappe._dict(val)
	return val


def get_db_count(*args):
	"""
	Pass a doctype or a series of doctypes to get the count of docs in them.

	Parameters:
	        *args: Variable length argument list of doctype names whose doc count you need

	Return:
	        dict: A dict with the count values.

	Example:
	        via terminal:
	                bench --site erpnext.local execute frappe.utils.get_db_count --args "['DocType', 'Communication']"
	"""
	db_count = {}
	for doctype in args:
		db_count[doctype] = frappe.db.count(doctype)

	return json.loads(frappe.as_json(db_count))


def call(fn, *args, **kwargs):
	"""
	Pass a doctype or a series of doctypes to get the count of docs in them
	Parameters:
	        fn: frappe function to be called

	Return:
	        based on the function you call: output of the function you call

	Example:
	        via terminal:
	                bench --site erpnext.local execute frappe.utils.call --args '''["frappe.get_all", "Activity Log"]''' --kwargs '''{"fields": ["user", "creation", "full_name"], "filters":{"Operation": "Login", "Status": "Success"}, "limit": "10"}'''
	"""
	return json.loads(frappe.as_json(frappe.call(fn, *args, **kwargs)))


def get_safe_filters(filters):
	try:
		filters = json.loads(filters)

		if isinstance(filters, int | float):
			filters = frappe.as_unicode(filters)

	except (TypeError, ValueError):
		# filters are not passed, not json
		pass

	return filters


def create_batch(iterable: Iterable, size: int) -> Generator[Iterable, None, None]:
	"""Convert an iterable to multiple batches of constant size of batch_size.

	Args:
	        iterable (Iterable): Iterable object which is subscriptable
	        size (int): Maximum size of batches to be generated

	Yields:
	        Generator[List]: Batched iterable of maximum length `size`
	"""
	total_count = len(iterable)
	for i in range(0, total_count, size):
		yield iterable[i : min(i + size, total_count)]


def set_request(**kwargs):
	from werkzeug.test import EnvironBuilder
	from werkzeug.wrappers import Request

	builder = EnvironBuilder(**kwargs)
	frappe.local.request = Request(builder.get_environ())


def get_html_for_route(route):
	from frappe.website.serve import get_response

	set_request(method="GET", path=route)
	response = get_response()
	return frappe.safe_decode(response.get_data())


def get_file_size(path, format=False):
	num = os.path.getsize(path)

	if not format:
		return num

	suffix = "B"

	for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
		if abs(num) < 1024:
			return f"{num:3.1f}{unit}{suffix}"
		num /= 1024

	return "{:.1f}{}{}".format(num, "Yi", suffix)


def get_build_version():
	try:
		return str(os.path.getmtime(os.path.join(frappe.local.sites_path, "assets/assets.json")))
	except OSError:
		# .build can sometimes not exist
		# this is not a major problem so send fallback
		return frappe.utils.random_string(8)


def get_assets_json():
	def _get_assets():
		# get merged assets.json and assets-rtl.json
		assets = frappe.parse_json(frappe.read_file("assets/assets.json"))

		if assets_rtl := frappe.read_file("assets/assets-rtl.json"):
			assets.update(frappe.parse_json(assets_rtl))

		return assets

	if not frappe.conf.developer_mode:
		return frappe.client_cache.get_value(
			"assets_json",
			shared=True,
			generator=_get_assets,
		)

	else:
		return _get_assets()


def get_bench_relative_path(file_path):
	"""Fix paths relative to the bench root directory if exists and return the absolute path.

	Args:
	        file_path (str, Path): Path of a file that exists on the file system

	Return:
	        str: Absolute path of the file_path
	"""
	if not os.path.exists(file_path):
		base_path = ".."
	elif file_path.startswith(os.sep):
		base_path = os.sep
	else:
		base_path = "."

	file_path = os.path.join(base_path, file_path)

	if not os.path.exists(file_path):
		print(f"Invalid path {file_path[3:]}")
		sys.exit(1)

	return os.path.abspath(file_path)


def groupby_metric(iterable: dict[str, list], key: str):
	"""Group records by a metric.

	Usecase: Lets assume we got country wise players list with the ranking given for each player(multiple players in a country can have same ranking aswell).
	We can group the players by ranking(can be any other metric) using this function.

	>>> d = {
	        'india': [{'id':1, 'name': 'iplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'iplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'iplayer-3'}],
	        'Aus': [{'id':1, 'name': 'aplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'aplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'aplayer-3'}]
	}
	>>> groupby(d, key="ranking")
	{1: {'Aus': [{'id': 1, 'name': 'aplayer-1', 'ranking': 1},
	                        {'id': 2, 'name': 'aplayer-2', 'ranking': 1}],
	        'india': [{'id': 1, 'name': 'iplayer-1', 'ranking': 1},
	                        {'id': 2, 'name': 'iplayer-2', 'ranking': 1}]},
	2: {'Aus': [{'id': 2, 'name': 'aplayer-3', 'ranking': 2}],
	        'india': [{'id': 2, 'name': 'iplayer-3', 'ranking': 2}]}}
	"""
	records = {}
	for category, items in iterable.items():
		for item in items:
			records.setdefault(item[key], {}).setdefault(category, []).append(item)
	return records


def get_table_name(table_name: str, wrap_in_backticks: bool = False) -> str:
	name = f"tab{table_name}" if not table_name.startswith("__") else table_name

	if wrap_in_backticks:
		return f"`{name}`"

	return name


def squashify(what):
	if isinstance(what, Sequence) and len(what) == 1:
		return what[0]

	return what


def safe_json_loads(*args):
	results = []

	for arg in args:
		try:
			arg = json.loads(arg)
		except Exception:
			pass

		results.append(arg)

	return squashify(results)


def dictify(arg):
	if isinstance(arg, MutableSequence):
		for i, a in enumerate(arg):
			arg[i] = dictify(a)
	elif isinstance(arg, MutableMapping):
		arg = frappe._dict(arg)

	return arg


class _UserInfo(TypedDict):
	fullname: str
	image: str
	name: str
	email: str
	time_zone: str


def add_user_info(user: str | list[str] | set[str], user_info: dict[str, _UserInfo]) -> None:
	if not user:
		return

	if isinstance(user, str):
		user = [user]

	missing_users = [u for u in user if u not in user_info]
	if not missing_users:
		return

	missing_info = frappe.get_all(
		"User",
		{"name": ("in", missing_users)},
		["full_name", "user_image", "name", "email", "time_zone"],
	)

	for info in missing_info:
		user_info.setdefault(info.name, frappe._dict()).update(
			fullname=info.full_name or info.name,
			image=info.user_image,
			name=info.name,
			email=info.email,
			time_zone=info.time_zone,
		)


def is_git_url(url: str) -> bool:
	# modified to allow without the tailing .git from https://github.com/jonschlinkert/is-git-url.git
	pattern = r"(?:git|ssh|https?|\w*@[-\w.]+):(\/\/)?(.*?)(\.git)?(\/?|\#[-\d\w._]+?)$"
	return bool(re.match(pattern, url))


class CallbackManager:
	"""Manage callbacks.

	```
	# Capture callacks
	callbacks = CallbackManager()

	# Put a function call in queue
	callbacks.add(func)

	# Run all pending functions in queue
	callbacks.run()

	# Reset queue
	callbacks.reset()
	```

	Example usage: frappe.db.after_commit
	"""

	__slots__ = ("_functions",)

	def __init__(self) -> None:
		self._functions = deque()

	def add(self, func: Callable) -> None:
		"""Add a function to queue, functions are executed in order of addition."""
		self._functions.append(func)

	def __call__(self, func: Callable) -> None:
		self.add(func)

	def run(self):
		"""Run all functions in queue"""
		while self._functions:
			_func = self._functions.popleft()
			_func()

	def reset(self):
		self._functions.clear()


def safe_eval(code, eval_globals=None, eval_locals=None):
	"""A safer `eval`"""

	from frappe.utils.safe_exec import safe_eval

	return safe_eval(code, eval_globals, eval_locals)


cached_property = functools.cached_property
if sys.version_info.minor < 12:
	T = TypeVar("T")

	class cached_property(functools.cached_property, Generic[T]):
		"""
		A simpler `functools.cached_property` implementation without locks.
		This isn't needed in Python 3.12+, since lock was removed in newer versions.
		Hence, in those versions, it returns the `functools.cached_property` object.

		This does not prevent a possible race condition in multi-threaded usage.
		The getter function could run more than once on the same instance,
		with the latest run setting the cached value. If the cached property is
		idempotent or otherwise not harmful to run more than once on an instance,
		this is fine. If synchronization is needed, implement the necessary locking
		inside the decorated getter function or around the cached property access.
		"""

		def __init__(self, func: Callable[[Any], T]):
			self.func = func
			self.attrname = None
			self.__doc__ = func.__doc__
			self.__module__ = func.__module__

		def __set_name__(self, owner, name):
			if self.attrname is None:
				self.attrname = name

			elif name != self.attrname:
				raise TypeError(
					"Cannot assign the same cached_property to two different names "
					f"({self.attrname!r} and {name!r})."
				)

		def __get__(self, instance, owner=None) -> T:
			if instance is None:
				return self

			value = self.func(instance)
			instance.__dict__[self.attrname] = value
			return value
# end: custom cached_property implementation
